Reactor 模式:
我们普通的函数调用 ,是程序调用某函数 ,函数执行中一直等待该函数执行完之后再继续执行下面的代码。Reactor 模式是一种事件驱动机制。和普通的函数调用不同的是这里的应用程序不是主动的调用某个API函数完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor,如果相应的事件发生,Reactor将主动调用应用 注册的接口,这些函数是回调函数。开始用户会在相应的event中设置回调函数和相应监听句柄并由libevent中的Reactor实例进行管理。
采用Reactor模式是编写高性能网络服务器的必备技术之一:
优点:响应快,不会因为某个同步事件所阻塞,因为采用的是回调函数执行,虽然Reactor本身是同步的。
采用Reactor框架本身与具体事件的处理没有关系,只负责处理与用户的交互,具有很高复用性。
可以扩展多个Reactor实例来实现多CPU的资源利用
因为采用了阻塞的select epoll等IO复用函数进行阻塞监听批量的句柄,所以在事件到来时事件的处理逻辑,也就是回调函数不会阻塞住,而是非阻塞的执行。
应用场景:
1.初始化libevent的实例也就是struct event_base结构体也就是对应的Reactor模型在libevent中的实体
struct event_base *base = event_init()
2.用户初始化所要注册的事件 根据不同的事件,网络中主要包括 定时事件,IO事件,信号事件,libevent中使用宏方便用户根据不同的事件调用与事件名称相匹配的函数,但是内部全部都是调用一个借口event_set(),参数中对于所有时间都会有一个函数指针用于用户注册回调函数,一个句柄(对于IO事件就是文件描述符,信号就是信号的编号,对于定时事件不用设置)
3.将事件本身的基本信息设置好之后要和Reactor的实例也就是和某一个event_base 进行联系,因为可能存在多个event_base 实例
4.基本信息设置完成之后,调用event_add 函数将事件通过Reactor实例也就是struct_base的统一接口找到性能最高的IO复用函数注册到其中,包括设置超时时间。对于定时事件,libevent使用一个小根堆管理,key为超时时间,对于IO和信号事件,将该事件放到等待双向链表中,
5.进入无限循环等待就绪事件,以epoll为例,每次循环前,libevent都会检查定时事件中最小的超时时间tv,根据tv设置epoll的最大等待时间,以便后面及时处理超时事件,当epoll超时返回后就将超时事件添加到就绪队列如果是正确返回就不用添加超时事件,之后同样直接依次遍历就绪队列执行相应的回调函数处理逻辑。此处可以看出是同步处理逻辑的。(IO事件已经在epoll_wait中添加进了就绪队列了)
IO和timer事件的统一:
因为系统提供的IO机制像select或者epoll_wait 都允许程序制定一个最大的等待时间,也称作最大超时时间timeout,即使没有IO事件发生,也能保证能在timeout时间到达时候返回。
根据所有timer事件的最小超时事件来设置系统IO的timeout时间,当系统IO返回时候再激活所有继续的timer事件就可以了,这样就能将timer事件完美的融合到系统的IO机制中去了。这是Reactor 和Proactor模式中处理Timer事件最经典的方法了。
libevent支持多线程:
libevent代码本身不支持多线程,因为源代码没有同步机制。
但是可以采用消息通知机制来支持多线程:
1.暴力抢占:当一个线程正在执行的时候,此时主线程来了一个任务此时立即抢占执行主线程的任务,此时好处是任务可以立即得到处理,但是你必须处理好切换的问题,过多的切换也会为CPU带来效率问题。
2.消息通知机制:当主进程有一个任务需要处理的时候会发送一个消息通知你去执行任务,此时当前进程还是执行自己的任务,在自己的任务执行完后,查看消息说通知有一个任务,再去处理任务,但是通知消息不是立即查看的,没有很好的实时性。
3.消息通知+同步层 :有个折中的处理方式,就是中间增减一个任务队列,这个任务队列是所有线程都可以看到的,每个线程都将新任务扔到这个队列中并且发送一个字符来通知,得到通知的当前线程只是取出其中的一个任务。当然,对于这个任务的 *** 作都是同步的,也就是每一个线程 *** 作要加锁,这就是一个加锁的队列。
Windows下的Memcache安装1. 下载memcache的windows稳定版,解压放某个盘下面,比如在c:\memcached
2. 在终端(也即cmd命令界面)下输入 c:\memcached\memcached.exe -d install--安装memcached成为服务,这样才能正常运行,否则运行失败!
3. 再输入: c:\memcached\memcached.exe -d start--启动memcached的。
以后memcached将作为windows的一个服务每次开机时自动启动。这样服务器端已经安装完毕了。
Linux下的安装:
1.下载memcached和libevent,放到 /tmp 目录下
# cd /tmp
# wget http://www.danga.com/memcached/dist/memcached-1.2.0.tar.gz
# wget http://www.monkey.org/~provos/libevent-1.2.tar.gz
2.先安装libevent:
# tar zxvf libevent-1.2.tar.gz
# cd libevent-1.2
# ./configure –prefix=/usr
# make
# make install
3.测试libevent是否安装成功:
# ls -al /usr/lib | grep libevent
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent-1.2.so.1 ->libevent-1.2.so.1.0.3
-rwxr-xr-x 1 root root 263546 11?? 12 17:38 libevent-1.2.so.1.0.3
-rw-r–r– 1 root root 454156 11?? 12 17:38 libevent.a
-rwxr-xr-x 1 root root 811 11?? 12 17:38 libevent.la
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent.so ->libevent-1.2.so.1.0.3
4.安装memcached,同时需要安装中指定libevent的安装位置:
# cd /tmp
# tar zxvf memcached-1.2.0.tar.gz
# cd memcached-1.2.0
# ./configure –with-libevent=/usr
# make
# make install
如果中间出现报错,请仔细检查错误信息,按照错误信息来配置或者增加相应的库或者路径。
安装完成后会把memcached放到 /usr/local/bin/memcached ,
5.测试是否成功安装memcached:
# ls -al /usr/local/bin/mem*
-rwxr-xr-x 1 root root 137986 11?? 12 17:39 /usr/local/bin/memcached
-rwxr-xr-x 1 root root 140179 11?? 12 17:39 /usr/local/bin/memcached-debug
memcached的基本设置:
1.启动Memcache的服务器端:
# /usr/local/bin/memcached -d -m 10 -u root -l 192.168.0.200 -p 12000 -c 256 -P /tmp/memcached.pid
-d选项是启动一个守护进程,
-m是分配给Memcache使用的内存数量,单位是MB,这里是10MB,
-u是运行Memcache的用户,这里是root,
-l是监听的服务器IP地址,如果有多个地址的话,这里指定了服务器的IP地址192.168.0.200,
-p是设置Memcache监听的端口,这里设置了12000,最好是1024以上的端口,
-c选项是最大运行的并发连接数,默认是1024,这里设置了256,按照你服务器的负载量来设定,
-P是设置保存Memcache的pid文件,这里是保存在 /tmp/memcached.pid,
2.如果要结束Memcache进程,执行:
# kill `cat /tmp/memcached.pid`
也可以启动多个守护进程,不过端口不能重复。
3.重启apache,service httpd restart
java的客户端连接程序:
将java_memcached-release_1.6.zip解压后的目录中的java_memcached-release_2.0.jar文件复制到java项目的lib目录下。
package utils.cache
import java.util.Date
import com.danga.MemCached.MemCachedClient
import com.danga.MemCached.SockIOPool
/**
* 使用memcached的缓存实用类.
*/
public class MemCached
{
// 创建全局的唯一实例
protected static MemCachedClient mcc = new MemCachedClient()
protected static MemCached memCached = new MemCached()
// 设置与缓存服务器的连接池
static {
// 服务器列表和其权重
String[] servers = {"127.0.0.1:11211"}
Integer[] weights = {3}
// 获取socke连接池的实例对象
SockIOPool sockIOPool = SockIOPool.getInstance()
// 设置服务器信息
sockIOPool.setServers( servers )
sockIOPool.setWeights( weights )
// 设置初始连接数、最小和最大连接数以及最大处理时间
sockIOPool.setInitConn( 5 )
sockIOPool.setMinConn( 5 )
sockIOPool.setMaxConn( 250 )
sockIOPool.setMaxIdle( 1000 * 60 * 60 * 6 )
// 设置主线程的睡眠时间
sockIOPool.setMaintSleep( 30 )
// 设置TCP的参数,连接超时等
sockIOPool.setNagle( false )
sockIOPool.setSocketTO( 3000 )
sockIOPool.setSocketConnectTO( 0 )
//sockIOPool.setFailover(bFailover)
//sockIOPool.setAliveCheck(bAliveCheck)
// 初始化连接池
sockIOPool.initialize()
// 压缩设置,超过指定大小(单位为K)的数据都会被压缩
if (memCachedClient == null)
{
mcc = new MemCachedClient(sPoolName)
mcc.setCompressEnable(true)
mcc.setCompressThreshold(4096)
mcc.setPrimitiveAsString(true)
}
}
/*
<h3>基于Spring的配置,如下:</h3>
<pre>
<bean id="memCachedService" class="com.ms.memcached.MemCachedServiceImpl">
<constructor-arg index="0" value="${memcached.pool.name}" />
<constructor-arg index="1" value="${memcached.pool.servers}" />
<constructor-arg index="2" value="${memcached.pool.initConn}" />
<constructor-arg index="3" value="${memcached.pool.maxConn}" />
<constructor-arg index="4" value="${memcached.pool.minConn}" />
<constructor-arg index="5" value="${memcached.pool.socketTO}" />
<constructor-arg index="6" value="${memcached.pool.maintSleep}" />
<constructor-arg index="7" value="${memcached.pool.nagle}" />
<constructor-arg index="8" value="${memcached.pool.failover}" />
<constructor-arg index="9" value="${memcached.pool.aliveCheck}" />
</bean>
</pre>
<h3>利用com.MS.cache.properties来设置参数,如下:</h3>
<pre>
memcached.pool.name = MS
memcached.pool.servers = 192.168.9.132:12000,192.168.9.133:12000
memcached.pool.initConn = 128
memcached.pool.maxConn = 1024
memcached.pool.minConn = 20
memcached.pool.socketTO = 3000
memcached.pool.maintSleep = 30
memcached.pool.nagle = false
memcached.pool.failover = true
memcached.pool.aliveCheck = true
</pre>
*/
/**
* 保护型构造方法,不允许实例化!
*/
protected MemCached()
{
}
/**
* 获取唯一实例.
*/
public static MemCached getInstance()
{
return memCached
}
/**
* 添加一个指定的值到缓存中.
* @param key
* @param value
*/
//新增指定key的缓存内容,但不覆盖已存在的内容。
public boolean add(String key, Object value)
{
return mcc.add(key, value)
}
//expiry过期时间
public boolean add(String key, Object value, Date expiry)
{
return mcc.add(key, value, expiry)
}
//新增或覆盖指定Key的缓存内容
public boolean set(String key, Object value)
{
return mcc.set(key, value)
}
//lExpiry过期时间
public boolean set(String key, Object value, long lExpiry)
{
return mcc.set(key, value, new Date(lExpiry))
}
//根据指定的Key获取缓存内容
public boolean get(String key)
{
return mcc.get(key)
}
//根据指定Key更新缓存内容
public boolean replace(String key, Object value)
{
return mcc.replace(key, value)
}
//lExpiry 指定的时间
public boolean replace(String key, Object value, long lExpiry)
{
return mcc.replace(key, value, new Date(lExpiry))
}
//根据指定Key删除缓存内容
public boolean delete(String key, Object value)
{
return mcc.delete(key, value)
}
//根据指定Key在指定时间后删除缓存内容
public boolean delete(String key, Object value, long lExpiry)
{
return mcc.delete(key, value, new Date(lExpiry))
}
//检测Cache中当前Key是否存在
public boolean exists(String key)
{
return mcc.exists(key)
}
//根据指定一批Key批量获取缓存内容。
/*
* @param sKeys 指定的一批Key。
* @return Object[oValue]
*/
public Object[] getMultiArray(String[] sKeys) throws ServiceException
{
return memCachedClient.getMultiArray(sKeys)
}
/**
* 根据指定一批Key批量获取缓存内容。
*
* @param sKeys 指定的一批Key。
* @return Map<sKey, oValue>
*/
public Map<String, Object>getMulti(String[] sKeys) throws ServiceException
{
return memCachedClient.getMulti(sKeys)
}
public static void main(String[] args)
{
MemCached memCached= MemCached.getInstance()
memCached.add("hello", 234)
System.out.print("get value : " + memCached.get("hello"))
}
}
那么我们就可以通过简单的像main方法中 *** 作的一样存入一个变量,然后再取出进行查看,我们可以看到先调用了add,然后再进行get,我们运行一次 后,234这个值已经被我们存入了memcached的缓存中的了,我们将main方法中红色的那一行注释掉后,我们再运行还是可以看到get到的 value也是234,即缓存中我们已经存在了数据了。
对基本的数据我们可以 *** 作,对于普通的POJO而言,如果要进行存储的话,那么比如让其实现java.io.Serializable接口,因为 memcached是一个分布式的缓存服务器,多台服务器间进行数据共享需要将对象序列化的,所以必须实现该接口,否则会报错的。
Entity
/**
* 获取当前实体的缓存Id
*
* @return
*/
public String getCacheId()
{
return getCacheId(this.getClass(), sBreedId)
}
get
public Breed getBreedById(String sBreedId) throws ServiceException
{
Breed breed = (Breed)memCachedService.get(getCacheId(Breed.class, sBreedId))
if(breed == null)
{
breed = service.get("breed.getBreedById", sBreedId)
if(breed != null)
{
memCachedService.set(breed.getBreedId(), breed)
}
}
return breed
}
save
memCachedService.set(spider.getCacheId(), breed)
update
memCachedService.replace(spider.getCacheId(), breed)
remove
memCachedService.delete(getCacheId(Spider.class, IbreedId))
或
memCachedService.delete(breed.getCacheId())
listAll
public List listAll() throws ServiceException
{
List breeds = new ArrayList ()
List breedIds = (List)memCachedService.get(getKeyByMap("Breed", null))
if(ObjectUtils.isEmpty(breedIds))
{
breeds = service.list("breed.getAllBreed", null)
if (!ObjectUtils.isEmpty(breeds))
{
breedIds = new ArrayList()
for (Breed breed : breeds)
{
breedIds.add(breed.getBreedId())
}
memCachedService.set(getKeyByMap("Breed", null), breedIds)
}
}
else
{
for (String sBreedId : breedIds)
{
Breed breed = getBreedById(sBreedId)
if (breed != null)
{
breeds.add(breed)
}
}
}
return breeds
}
libevent是一个轻量级的基于事件驱动的高性能的开源网络库,并且支持多个平台,对多个平台的I/O复用技术进行了封装,当我们编译库的代码时,编译的脚本将会根据OS支持的处理事件机制,来编译相应的代码,从而在libevent接口上保持一致。
在当前的服务器上,面对的主要问题就是要能处理大量的连接。而通过libevent这个网络库,我们就可以调用它的API来很好的解决上面的问题。首先,可以来回顾一下,对这个问题的传统解决方法。
问题: 如何处理多个客户端连接
解决方案1: I/O复用技术
这几种方式都是同步I/O,即当读写事件就绪,他们自己需要负责进行读写,这个读写过程是阻塞的,而异步I/O则不需要自己负责读写,只需要通知负责读写的程序就可以了。
解决方案2: 多线程技术或多进程技术
多线程技术和多进程技术也可以处理高并发的数据连接,因为在服务器中可以产生大量的进程和线程和处理我们需要监视的连接。但是,这两种方式也是有很大的局限性的,比如多进程模型就不适合大量的短连接,因为进程的产生和关闭需要消耗较大的系统性能,同样,还要进程进程间的通信,在CPU性能不足的情况下不太适合。而多线程技术则不太适合处理长连接,因为当我们建立一个进程时,linux中会消耗8G的栈空间,如果我们的每个连接都杵着不断开,那么大量连接长连接后,导致的结果就是内存的大量消耗。
解决方案3: 常用的上述二者复合使用
上述的两种方法各具有优缺点,因此,我们可以将上述的方法结合起来,这也是目前使用较多的处理高并发的方法。多进程+I/O复用或者多线程+I/O复用。而在具体的实现上,又可以分为很多的方式。比如多线程+I/O复用技术,我们使用使用一个主线程负责监听一个端口和接受的描述符是否有读写事件产生,如果有,则将事件分发给其他的工作进程去完成,这也是进程池的理念。
在说完上述的高并发的处理方法之后,我们可以来介绍一个libevent的主要特色了。
同样,lievent也是采用的上述系统提供的select,poll和epoll方法来进行I/O复用,但是针对于多个系统平台上的不同的I/O复用实现方式,libevent进行了重新的封装,并提供了统一的API接口。libevent在实现上使用了事件驱动这种机制,其本质上是一种Reactor模式。
在Libevent中也是一样,向Libevent框架注册相应的事件和回调函数;当这些事件发生时,Libevent会调用这些回调函数处理相应的事件。
lbevent的事件支持三种,分别是网络IO、定时器和信号。定时器的数据结构使用最小堆(Min Heap),以提高效率。网络IO和信号的数据结构采用了双向链表(TAILQ)。
更多linux内核视频教程文本资料免费获取后台私信【内核】。
libevent的安装很简单,我是直接从github上clone下一个源码,然后进行编译安装的。
具体的命令是(假设你已经安装了git):
现在的libevent版本已经到达libevent2了,其增加了多线程的支持,API函数也发生了一些微小的变化。
如果你想知道更多的API使用情况,请点击这里。
下面,就基于libevent2编写一个聊天室服务器。
设计思想: 首先创建一个套接字,进而创建一个事件对此端口进行监听,将所请求的用户组成一个队列,并监听所有的用户事件,当某个用户说话了,产生了读事件,就将该用户的发言发送给队列中的其他用户。
程序分析
需要包含的libevent函数头:
创建一个client结构体,接受连接后存放数据:
先来看下mian函数的处理:
首先,函数初始化了一个用户队列tailq,接着创建了一个socket套接字,并将套接字设定为非阻塞模式,接着对一个全局的evbase事件集合,注册了事件,事件源是listen_fd,回调函数是on_accept,事件发生的情况是EV_READ,而且标志EV_PESIST表明该事件一直存在,而后开启事件扫描循环event_base_dispatch(evbase)。
再看一下回调函数on_accpet实现:
这个回调函数的作用很显然,就是接受了一个客户端的请求,并申请好了一个client信息,将需要的内容填写好,在填写中需要注意的是,又向上述的事件集evbase中注册了一个bufferevent事件client->buf_ev,并注册了回调函数buffered_on_read,buffered_on_error,这三个函数分别是当接受后的连接发生了读或者错误事件后的执行函数。接着,将用户的client结构放入了用户的队列tailq中去。
用户的buffer可读后的执行函数:
执行函数的作用很明显,将libevent管理中的buffer数据读取出,存入本地的data数组内,然后对队列中的client进行检索,如果不是发数据的client,则将数据写入该client的buffer中,发送给该用户。这里注意的是需要反复读取buffer中的数据,防止一个读取并没有读取干净,直到读取不到数据为止。
buffer出错处理函数和上述函数差不多,功能就是出错后,结束掉保存的client结构,详细就不说了。
编译的时候记得修改Makefile中Libevent文件夹的位置
设计思想: 所谓回显服务器就是将客户端发过来的数据再发回去,这里主要也就是说明libevent的纯IO复用实现。实现方法和上面的差不多,甚至可以说更加简单。
程序和上面的聊天服务器差不多,只是在buffer可读的事件函数中,不是将用户的数据发送给其他用户,而是直接发送给用户本身。
设计思想: 上面的方法单纯使用libevent的简单函数来实现服务,但是这里,我们假设我们需要处理的客户端很少,于是我们可以使用对于每个连接我们分配一个线程这样的方式来实现对用户的服务。这种方式简单有效,一对一服务,就算业务逻辑出现阻塞也不怕。
程序分析
首先定义了一些数据结构,worker数据结构定义的是一个工作者,它包含有一个工作线程,和结束标志,需要获取的工作队列,和建立链表需要的指针。job数据结构定义的是 *** 作一个job的方法和对象,这回到程序中,实际上就是指的是事件发生后,封装好的client结构体和处理这个结构体的方法。workqueue数据结构指的是当前的工作队列中的工作者,以及工作队列中的待完成的工作,以及互斥锁和条件变量(因为多个工作进程需要访问这些资源)。
具体的流程就是,用一个主线程监听一个套接字,并将套接字接受到的连接accept,并创建一个client数据结构保存该连接的信息,在这个client结构中注册一个bufferevent事件,注册到client->evbase上(这时候这是向client中的evbase注册了一个事件还没有进行循环这个事件集)。
接着,当监听到某个client有bufferevent事件发生,主线程就把该client结构体和需要进行的工作方法包装成一个job结构,然后把这个job扔到workqueue上去,并通知各个工作者。而后,各个工作者开着的线程就被激活了,疯狂地去workqueue上去抢工作做,某个worker拿到工作后,就可以解包job,根据job的工作说明书(job_function) *** 作工作对象(client)了。这里,job的工作说明有是循环client中的client->evbase,于是这样线程就会一直去监视这个连接的状态,如果有数据就这会调用回调函数进行处理。同时,这个线程也就是阻塞在这里,这对这一个连接负责。
建立workqueue需要的结构体和函数有:
主线程的on_accept函数为:
job中的工作指南为:
设计思想: 假设我们的用户很多,高并发,长连接,那么我们还是来用I/O复用和线程池实现吧,用一个控制线程通过I/O复用负责监听和分发事件,用一组线程池来进行处理事件,这样就可以灵活地将控制逻辑和业务逻辑分开了,见下述讲解。
程序分析
具体的流程和上面的差不多,用一个主线程监听一个套接字,并将套接字接受到的连接accept,并创建一个client数据结构保存该连接的信息,在这个client结构中注册一个bufferevent事件,但是这里,将事件注册到accept_evbase中,仍然用主线程进行监听。
而面对监听后出现的事件,将client和 *** 作client的方法打包成一个job,放到上述的workqueue中去,让工作进程来完成。这样的 *** 作和上述的差别在于上述方法将bufferevent注册到client中的evbase中,用工作线程监听,而本方法用主线程监听,工作线程负责处理监听产生的事件。
这要的差别在于两个函数 on_accept函数:
在buffered_on_read中,提交job。
在job工作指南server_job_function中就可以做你工作该做的事儿了,根据发来的信息进行数据库处理,http返回等等。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)