muduo网络库学习—EventLoop,Channel,Poller(2)

muduo网络库学习—EventLoop,Channel,Poller(2),第1张

muduo网络库学习(2)

文章目录
  • muduo网络库学习(2)
  • 前言
  • 一、oneloop per thread
  • 二、EventLoop事件循环流程
    • 1.事件循环流程
  • 二、Channel类
  • 三、Poller类
    • 1.select poll epoll
    • 2.代码解析
  • 总结


前言

本章主要记录muduo库中的事件循环功能实现即EventLoop类实现。


一、oneloop per thread

为了保证每一个线程只有一个loop循环,muduo在每个线程中创建了一个变量记录EventLoop的指针。

namespace mduo{
namespace net{
	// 当前线程EventLoop对象指针
	// 线程局部存储
	__thread EventLoop*t_loopThisThread=0;

	//构造函数中
	EventLoop::EventLoop(){
		if(t_loopThisThread){
			//之前改线程已经建立了EventLoop对象
		}
		else{
			t_loopThisThread=this;
		}
	}
	
	EventLoop::~EventLoop()
	{
  		t_loopInThisThread = NULL;
	}

	//判断Eventloop对象执行时候是否在当前线程中
	bool EventLoop::isInLoopThread(){
			return this==t_loopThisThread;
	}
}
}


二、EventLoop事件循环流程 1.事件循环流程

由EventLoop通过loop函数从而调用成员Poller的poll()函数得到当前就绪的IO事件activeChannels返回给EventLoop,EventLoop通过调用对应Channel的handleEvent进行事件处理,周而复始!

代码如下:

// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
  assert(!looping_);
  // 断言当前处于创建该对象的线程中
  assertInLoopThread();
  looping_ = true;
  LOG_TRACE << "EventLoop " << this << " start looping";

  //::poll(NULL, 0, 5*1000);
  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    //++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    //doPendingFunctors();
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  // XXX pollfds_ shouldn't change
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happended";
    fillActiveChannels(numEvents, activeChannels);
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happended";
  }
  else
  {
    LOG_SYSERR << "PollPoller::poll()";
  }
  return now;
}

void PollPoller::fillActiveChannels(int numEvents,
                                    ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      assert(ch != channels_.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      // pfd->revents = 0;
      activeChannels->push_back(channel);
    }
  }
}
二、Channel类

Channel类不控制文件描述符的生存周期,Channel中的文件描述符可以是
eventfd,timefd,signalfd。

Channel主要功能:
2.1:这个文件描述符产生的事件增加文件描述符

代码如下:

  void setReadCallback(const ReadEventCallback& cb)
  { readCallback_ = cb; }
  void setWriteCallback(const EventCallback& cb)
  { writeCallback_ = cb; }
  void setCloseCallback(const EventCallback& cb)
  { closeCallback_ = cb; }
  void setErrorCallback(const EventCallback& cb)
  { errorCallback_ = cb; }

2.2:向EventLoop中注册事件

代码如下:

void enableReading() { events_ |= kReadEvent; update(); }
// void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
  
void Channel::update()
{
  loop_->updateChannel(this);
}

void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel);
}
	
void PollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  if (channel->index() < 0)
  {
	// index < 0说明是一个新的通道
    // a new one, add to pollfds_
    assert(channels_.find(channel->fd()) == channels_.end());
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    pollfds_.push_back(pfd);
    int idx = static_cast<int>(pollfds_.size())-1;
    channel->set_index(idx);
    channels_[pfd.fd] = channel;
  }
  else
  {
    // update existing one
    assert(channels_.find(channel->fd()) != channels_.end());
    assert(channels_[channel->fd()] == channel);
    int idx = channel->index();
    assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
    struct pollfd& pfd = pollfds_[idx];
    assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
	// 将一个通道暂时更改为不关注事件,但不从Poller中移除该通道
    if (channel->isNoneEvent())
    {
      // ignore this pollfd
	  // 暂时忽略该文件描述符的事件
	  // 这里pfd.fd 可以直接设置为-1
      pfd.fd = -channel->fd()-1;	// 这样子设置是为了removeChannel优化
    }
  }
}

// 调用这个函数之前确保调用disableAll
void Channel::remove()
{
  assert(isNoneEvent());
  loop_->removeChannel(this);
}
void EventLoop::removeChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  if (eventHandling_)
  {
    assert(currentActiveChannel_ == channel ||
        std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
  }
  poller_->removeChannel(channel);
}

void PollPoller::removeChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd();
  assert(channels_.find(channel->fd()) != channels_.end());
  assert(channels_[channel->fd()] == channel);
  assert(channel->isNoneEvent());
  int idx = channel->index();
  assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
  const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
  assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
  size_t n = channels_.erase(channel->fd());
  assert(n == 1); (void)n;
  if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
  {
    pollfds_.pop_back();
  }
  else
  {
	// 这里移除的算法复杂度是O(1),将待删除元素与最后一个元素交换再pop_back
    int channelAtEnd = pollfds_.back().fd;
    iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
    if (channelAtEnd < 0)
    {
      channelAtEnd = -channelAtEnd-1;
    }
    channels_[channelAtEnd]->set_index(idx);
    pollfds_.pop_back();
  }
}


  

3:处理文件描述符对应的事件

代码如下:

void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  eventHandling_ = true;
  //只有管道的写端被关闭,读端才会接收到POLLHUP
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }

	//无效的文件描述符
  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL))
  {
    if (errorCallback_) errorCallback_();
  }
  //POLLRDHUP对等方关闭或者半关闭read返回为0
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }
  eventHandling_ = false;
}
三、Poller类

poller类是muduo库中唯一一个基于面向对象思想封装的类,由poller抽象类得到EpollPoller类,PollPoller类,两者区别在于IO复用的方法分别是epoll和poll。

1.select poll epoll

首先介绍一下Linux下三种IO复用系统调用的区别与联系

语法区别:

select(int nfds,fd_set* readfds,fd_set writefds,fd_set exception,struct timeval*timeout)
/*nfds:监听文件描述符最大值+1
fd_set:文件描述符数组
timeout:超时事件
struct timeval{
int s;
int us;
}
*/
poll(struct pollfd*pollfds,nfds_t nfds,int timeout)
/*
struct pollfd{
int fd;//文件描述符
int events;//注册的事件
int revents;//实际就绪的事件,由内核填充
}
nfds:被监听事件集合的大小
timeout:超时时间ms
*/

epoll_wait(int epollfd,struct epoll_event*events,int maxevents,int timeout)

/*
epollfd:标识内核事件表
events:返回的就绪事件
struct epoll_event{
epoll_data_t data;
_uint32_t events;
};

typedef unio epoll_data{
void*ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t
maxevents:最多监听多少个事件
timeout:超时时间
*/

返回值:
超过超时时间没有时间发生返回0;
发生错误返回-1,指定errno;
有事件发生返回事件的个数。

区别:

2.代码解析

抽象类Poller:四个虚函数,分别是用于析构,更新通道监听事件,删除通道对应的事件以及,开始事件监听循环(返回活跃通道),注意在muduo中将文件描述符以及就绪事件,注册事件,对应的read,write,error处理方法封装成了一个Channel类。

class Poller{
public:
typedef std::vector<Channel*> ChannelList;

Poller(Eventloop*loop);
virtual ~Poller();//抽象类必须设置析构函数为虚函数,避免内存泄漏

/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;

/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;

//创建eventloop对应的Poller对象
static Poller* newDefaultPoller(Eventloop*loop);

//断言必须在创建eventloop的线程进行io复用
void assertInLoopThread();

private:
    Eventloop* loop_;//一对一关系
};

PollPoller:
添加通道:通过判断Channel对象的下标来判断之前是否已经加入到pollfd中,若没有则加入了则pollfd数组同时更新map,否则就更新对应pollfd对应events即可。

void PollPoller::updateChannel(Channel* channel){
    //断言在创建Eventloop的线程中
    Poller::assertInLoopThread();
    LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
    if(channel->index()<0){
        //index<0 说明是一个新的通道
        assert(Channels_.find(channel->fd())==Channels_.end());
        struct pollfd pfd;
        pfd.fd=channel->fd();
        pfd.events=channel->events();
        pfd.revents=0;

        Channels_[pfd.fd]=channel;
        pollfds_.push_back(pfd);
        int idx=pollfds_.size()-1;
        channel->set_index(idx);
    }
    else{
        assert(Channels_[channel->fd()]->fd()==channel->fd());
        int idx=channel->index();
        assert(idx>0&&idx<Channels_.size());
        struct pollfd pfd=pollfds_[idx];
        assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
        pfd.events=channel->events();
        pfd.revents=0;
    // 将一个通道暂时更改为不关注事件,但不从Poller中移除该通道
        if(channel->isNoneEvent()){
            // ignore this pollfd
            // 暂时忽略该文件描述符的事件
            // 这里pfd.fd 可以直接设置为-1
            pfd.fd=-pfd.fd-1;
        }  
    }
}

**移除通道:**移除通道之前需要先检测对应的通道是否没有注册事件了,若已经没有注册事件了直接将channel对应的pollfd从数组中删除,同时将map对应元素删除

void PollPoller::removeChannel(Channel* channel){
    Poller::assertInLoopThread();
    LOG_TRACE << "fd = " << channel->fd();
    assert(Channels_.find(channel->fd()) != Channels_.end());
    assert(Channels_[channel->fd()] == channel);
    assert(channel->isNoneEvent());

    int idx = channel->index();
    assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
    const struct pollfd& pfd = pollfds_[idx];
    assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
    size_t n = Channels_.erase(channel->fd());
    assert(n == 1);
    if(static_cast<size_t>(idx)==pollfds_.size()-1){
        pollfds_.pop_back();
    }
    else{
        // 这里移除的算法复杂度是O(1),将待删除元素与最后一个元素交换再pop_back
        int temp_id=pollfds_[pollfds_.size()-1].fd;
        iter_swap(pollfds_.begin()+idx,pollfds_.begin()+pollfds_.size()-1);
        if (temp_id < 0)
        {
        temp_id = -temp_id-1;
        }
        Channels_[temp_id]->set_index(idx);
        pollfds_.pop_back();
        
    }
}

**IO复用:**遍历pollfd数组中的revents如果大于0,证明有就绪事件发生,同时更新对应的Channel对象中的revents,并将活跃的通达存于ActiveChannels中,返回给EventLoop,EventLoop对活跃通道进行事件回调。

Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels){
    int number=::poll(&(*pollfds_.begin()),pollfds_.size(),timeoutMs);
    if(number<0){
        LOG_SYSERR << "PollPoller::poll()";
    }
    else if(number==0){
        LOG_TRACE << " nothing happended";
    }
    else{
        LOG_TRACE << number << " events happended";
        fillActiveChannels(number, activeChannels);
    }
    return Timestamp::now();
}

void PollPoller::fillActiveChannels(int numevents, ChannelList* activeChannels){
    for(PollfdList::iterator it=pollfds_.begin();it!=pollfds_.end();it++){

        if(it->revents!=0){
            numevents--;
            ChannelMap::iterator temp=Channels_.find(it->fd);
            assert(it->fd==temp->second->fd());
            temp->second->set_revents(it->revents);
            activeChannels->push_back(temp->second);
        }

    }
}

EPollPoller:
EpollPoller相对于PollPoller来说,主要是下标的 *** 作,对于EpollPoller,remove后对于的下标为new,update中如果是更新的话则是add,如果是不关注事件则是delete但是这里没有将通道移除map容器。其余的 *** 作与PollPoller没有区别

enum MOD{
    kNew = -1,
    kAdded = 1,
    kDeleted = 2
};


EpollPoller::EpollPoller(Eventloop*loop):
Poller(loop),
epollfd_(epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
    if(epollfd_<0){
        LOG_SYSFATAL << "EPollPoller::EPollPoller";
    }
}

EpollPoller:: ~EpollPoller(){
    ::close(epollfd_);
}

Timestamp EpollPoller::poll(int timeoutMs, ChannelList* activeChannels){
    int number=epoll_wait(epollfd_,&*events_.begin(),events_.size(),timeoutMs);
    if(number>0){
        LOG_TRACE << number << " events happended";
        fillActiveChannels(number, activeChannels);
        if (static_cast<size_t>(number) == events_.size())
        {
            events_.resize(events_.size()*2);
        }
    }
    else if(number==0){
        LOG_TRACE << " nothing happended";
    }
    else{
        LOG_SYSERR << "EPollPoller::poll()";
    }

    return Timestamp::now();
}


void EpollPoller::fillActiveChannels(int number,ChannelList*activeChannels){

    for(int i=0;i<number;i++){
            //ptr指向的用户数据中包含了fd
            Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
            int fd = channel->fd();
            ChannelMap::const_iterator it = channels_.find(fd);
            assert(it != channels_.end());
            assert(it->second == channel);
            channel->set_revents(events_[i].events);
            activeChannels->push_back(channel);
    }
}

void EpollPoller::updateChannel(Channel* channel){
    Poller::assertInLoopThread();
    LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
    int index=channel->index();
    if(index==MOD::kNew||index==MOD::kDeleted){
        int fd=channel->fd();
        if(index==MOD::kNew){
            assert(channels_.find(fd) == channels_.end());
            channels_[fd] = channel;
        }
        else{
            assert(channels_.find(fd) != channels_.end());
            assert(channels_[fd] == channel);
        }
        channel->set_index(MOD::kAdded);
        update(EPOLL_CTL_ADD, channel);
    }
    else{
        int fd = channel->fd();
        assert(channels_.find(fd) != channels_.end());
        assert(channels_[fd] == channel);
        assert(index == MOD::kAdded);
        if (channel->isNoneEvent())
        {
            update(EPOLL_CTL_DEL, channel);
            channel->set_index(MOD::kDeleted);
        }
        else
        {
            update(EPOLL_CTL_MOD, channel);
        }
    }
}

void EpollPoller::removeChannel(Channel* channel){
    Poller::assertInLoopThread();
    int fd = channel->fd();
    assert(channels_.find(fd) != channels_.end());
    assert(channels_[fd] == channel);
    assert(channel->isNoneEvent());
    int index = channel->index();
    assert(index == kAdded || index == kDeleted);
    size_t n = channels_.erase(fd);
    assert(n == 1);

    if (index == kAdded)
    {
        update(EPOLL_CTL_DEL, channel);
    }
    channel->set_index(MOD::kNew);
}

void EpollPoller::update(int operation, Channel* channel){
    struct epoll_event event;
    memset(&event,0,sizeof event);
    event.events = channel->events();
    event.data.ptr = channel;
    int fd = channel->fd();
    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
    {
        if (operation == EPOLL_CTL_DEL)
        {
        LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
        }
        else
        {
        LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
        }
    }
}


总结

本章主要讲述了EventLoop,Channel,以及Poller三者之间的关系,主要是一个IO线程对应一个EventLoop,一个EventLoop对应着一个Poller用于事件循环,一个Poller可以关注多个Channel,返回给EventLoop 活跃的ActiveChannels。注意的是EventLoop中的Channel中更新删除以及loop都必须在创建loop的线程中使用,避免跨线程危险。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/874840.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存