redis怎么实现将消息队列持久化到数据库中

redis怎么实现将消息队列持久化到数据库中,第1张

要看你用的是什么语言,如果是php的,你可以这样考虑,

首先一个是将这两个分为两个队列来实现, 一个用来实现消息优先级,一个来实现定时发送

用的是redis的有序集合,用zadd添加时,将score比做是优先级,也可以用时间戳来当做score,用来表示时间

将消息加入优先级的队列,将1,2替换为时间就是定时发送的队列了

$redis = new Redis()

$redis->connect('127.0.0.1', 6379)

$redis->zAdd('zset1', 1, 'message')

$redis->zAdd('zset1', 2, 'message2')

从队列中取出数据

$redis->zRevRangeByScore('zset1, '+inf', '-inf', array('withscores'=>false, 'limit'=>array(0,20)))

这条语句表示从zset1这个队列里按照score从最大(+inf)到最小(-inf)的排序中取出20条,不带score,如果想要从小到大可以用 zRangeByScore

如果你想让这些都运行在命令行下,可以参考下面来,当然这些是经过删减的

<?php

while (true) {

$pid = pcntl_fork()

if ($pid == -1) {

echo date('Y-m-d H:i:s') . "fork失败!\n"

} else if ($pid == 0) {

$redis = new Redis()

$redis->connect('127.0.0.1', 6379)

$redis->zRevRangeByScore('zset1', '+inf', '-inf', array('withscores'=>false, 'limit'=>array(0,20)))

exit

} else {

pcntl_wait($status)

}

}

pcntl_fork是PHP中的生成子进程,当调用该函数时,会返回一个进程pid,当pid为0时表明是在子进程中,所以把要执行的东西全放这里,这样就实现了

ActiveMQ持久化消息的二种方式;

1、持久化为文件

这个装ActiveMQ时默认就是这种,只要设置消息为持久化就可以了。涉及到的配置和代码有:

<persistenceAdapter>

<kahaDB directory="${activemq.base}/data/kahadb"/>

</persistenceAdapter>

producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue)

2、持久化为MySql

首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar

接下来修改配置文件

<persistenceAdapter>

<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>

</persistenceAdapter>

在配置文件中的broker节点外增加

<bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

<property name="driverClassName" value="com.mysql.jdbc.Driver"/>

<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>

<property name="username" value="activemq"/>

<property name="password" value="activemq"/>

<property name="maxActive" value="200"/>

<property name="poolPreparedStatements" value="true"/>

</bean>

从配置中可以看出数据库的名称是activemq,需要手动在MySql中增加这个库。

然后重新启动消息队列,会发现多了3张表

1:activemq_acks

2:activemq_lock

3:activemq_msgs

为什么要用到消息队列?

你这个需求貌似只需要队列这个数据结构就行了

使用JDK中自带的就行,LinkedList是实现Queue的

Queue queue = new LinkedList();

queue.add(Object )//尾部添加

queue.remove()//头部取出

你只需要将通过JDBC把数据库取出的对象用循环依次add到queue, 然后再通过循环依次remove就行了


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6764500.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-28
下一篇 2023-03-28

发表评论

登录后才能评论

评论列表(0条)

保存