首先一个是将这两个分为两个队列来实现, 一个用来实现消息优先级,一个来实现定时发送
用的是redis的有序集合,用zadd添加时,将score比做是优先级,也可以用时间戳来当做score,用来表示时间
将消息加入优先级的队列,将1,2替换为时间就是定时发送的队列了
$redis = new Redis()
$redis->connect('127.0.0.1', 6379)
$redis->zAdd('zset1', 1, 'message')
$redis->zAdd('zset1', 2, 'message2')
从队列中取出数据
$redis->zRevRangeByScore('zset1, '+inf', '-inf', array('withscores'=>false, 'limit'=>array(0,20)))
这条语句表示从zset1这个队列里按照score从最大(+inf)到最小(-inf)的排序中取出20条,不带score,如果想要从小到大可以用 zRangeByScore
如果你想让这些都运行在命令行下,可以参考下面来,当然这些是经过删减的
<?php
while (true) {
$pid = pcntl_fork()
if ($pid == -1) {
echo date('Y-m-d H:i:s') . "fork失败!\n"
} else if ($pid == 0) {
$redis = new Redis()
$redis->connect('127.0.0.1', 6379)
$redis->zRevRangeByScore('zset1', '+inf', '-inf', array('withscores'=>false, 'limit'=>array(0,20)))
exit
} else {
pcntl_wait($status)
}
}
pcntl_fork是PHP中的生成子进程,当调用该函数时,会返回一个进程pid,当pid为0时表明是在子进程中,所以把要执行的东西全放这里,这样就实现了
ActiveMQ持久化消息的二种方式;1、持久化为文件
这个装ActiveMQ时默认就是这种,只要设置消息为持久化就可以了。涉及到的配置和代码有:
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue)
2、持久化为MySql
首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar
接下来修改配置文件
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>
在配置文件中的broker节点外增加
<bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
从配置中可以看出数据库的名称是activemq,需要手动在MySql中增加这个库。
然后重新启动消息队列,会发现多了3张表
1:activemq_acks
2:activemq_lock
3:activemq_msgs
为什么要用到消息队列?你这个需求貌似只需要队列这个数据结构就行了
使用JDK中自带的就行,LinkedList是实现Queue的
Queue queue = new LinkedList();
queue.add(Object )//尾部添加
queue.remove()//头部取出
你只需要将通过JDBC把数据库取出的对象用循环依次add到queue, 然后再通过循环依次remove就行了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)