redis-实现消息队列功能

redis-实现消息队列功能,第1张

redis-实现消息队列功能 发布与订阅

    Redis提供了"发布、订阅"模式的消息机制,其中消息订阅者与发布者不直接通信,发布者向指定的频道(channel)发送消息,订阅者从频道拿取消息。

pubsub简单命令介绍
  • publish [频道] [消息] 如 publish test yu

  • subscribe [频道] 订阅某个频道的消息

  • pubsub numsub [频道] 查看订阅数

  • unsubscribe [频道] 取消订阅

  • psubscribe ch* 按模式订阅

  • punsubscribe ch* 按模式取消订阅
    注意:这种方式是一种发送既忘的原则,如果发送时没有消费者订阅,消息就会丢失,pub/sub作为消息中间件时不能保证消息的可靠。保证消息的可靠性时,不能作为选择。Redis后续推出了Stream的发布订阅功能。

基于stream的发布与订阅

    可以使用消费者群组来消费,last_delivered_id来标识该消费者群组消费到了消息的哪个位置。一个消费者群组内部消费者可以有多个,且一个消息仅能被一个消费者群组下面的一个消费者消费。消息的确认ACK,确保消息一定被消费成功。pending_ids[]标识消息已经被消费者拿到了,但是消费者还没有做应答,如果一直不做应答,这个pending_ids(Pending Entries List)会越来越长,不支持消费超时,死信队列。

消息id: 毫秒时间戳-序号 表示消息是在这个时间戳产生的第几个。
Message Conent: 消息内容,String

基于Stream的 *** 作
  • xadd [stream] * [key] [value]…
    *表示消息id由redis生成 stream 为stream名称,返回值为消息id,即使出现了时钟回拨,Last_generated_id 记录最大的id,会使-后面的数字+1,也不会导致重复。

  • xlen [stream] 查看stream中消息个数

  • xrange [stream] - + 查看stream中所有消息
    -和+可以替换为对应的消息id

  • xdel [stream] [消息id] 删除stream中的指定消息

    支持消费者群组,同时也支持单消费者,单消费者用来保证消息的顺序性。

  • xread count 1 streams [stream] 0-0 表示从头开始消费一条消息

  • xread count 1 streams [stream] 消息id 指定id开始

  • xread count 1 streams [stream] $ 从尾部读取消息

  • xread block 0 count 1 streams [stream] $ 阻塞式从尾部读取

    消费者群组

  • xgroup create [stream] [group] 0-0

  • xgroup create [stream][group] $

  • xinfo groups [stream]
    查看消费者群组的信息

  • xreadgroup GROUP [group] [consumer] count [n] streams [stream] >
    创建一个消费者 group 消费者群组名称 , consumer 消费者名称 stream stream名称 > 从哪一个开始读取

    xreadgroup GROUP cg1 c1 block 0 count 1 streams stream:test > 阻塞式读取
    xinfo consumers [stream] [group]
    查看消费者群组中的信息

  • xack [stream] [consumer] [消息id]
    确认一条消息

  • xclaim 把消息在多个消费者之间转移

  • xpending [stream] [group] 查看未ack的消息

redis实现消息队列的方式
  1. 可通过队列实现 lpush brpop
@Component
public class ListVer {

    public static final String RS_LIST_MQ_NS = "rs:list:mq";

    @Autowired
    private JedisPool jedisPool;

    
    public void put(String key, String message) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.lpush(RS_LIST_MQ_NS+key, message);
        } catch (Exception ex) {
            throw new RuntimeException("发送消息失败");
        } finally {
            jedis.close();
        }
    }

    
    public List get(String key){
        Jedis jedis = null;
        try{
            jedis = jedisPool.getResource();
            return jedis.brpop(0,RS_LIST_MQ_NS+key);
        }catch (Exception ex){
            throw new RuntimeException("获取消息失败");
        }finally {
            jedis.close();
        }
    }
}
  1. 通过zset实现 一般是延时队列
@Component
public class ZsetVer {
    public static final String RS_ZSET_MQ = "rs:zset:mq";

    @Autowired
    private JedisPool jedisPool;

    
    public void sendMsg(String key, String message) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            for (int i = 0; i < 5; i++) {
                String orderNo = "000000000" + i;
                double expireTime = System.currentTimeMillis() + (i * 1000);
                jedis.zadd(RS_ZSET_MQ + key, expireTime, orderNo);
            }
        } catch (Exception ex) {
            throw new RuntimeException("发送延迟消息失败");
        } finally {
            jedis.close();
        }
    }

    
    public void getMsg(String key){
        Jedis jedis = null;
        try{
            jedis = jedisPool.getResource();
            while (true){
                Set strings = jedis.zrangeByScore(RS_ZSET_MQ + key, 0, System.currentTimeMillis());
                if(strings == null || strings.size() == 0){
                    Thread.sleep(1000);
                    continue;
                }
                String s = strings.iterator().next();
                if(jedis.zrem(RS_ZSET_MQ+key, s) > 0){
                    // 业务逻辑处理,这里删除的线程只有一个,所以只有一个可以处理到,不存在线程安全问题
                    System.out.println(s);
                }
            }
        }catch (Exception ex){
            throw new RuntimeException("消费消息失败");
        }finally {
            jedis.close();
        }
    }
}
  1. 通过pubsub实现 可能丢失消息
@Component
public class PSVer extends JedisPubSub {

    public final static String RS_PS_MQ_NS = "rpsm:";

    @Autowired
    private JedisPool jedisPool;

    @Override
    public void onMessage(String channel, String message) {
        System.out.println("Accept " + channel + ",message:" + message);
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("Subscribe " + channel + ", count:" + subscribedChannels);
    }

    public void publish(String channel, String message) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.publish(RS_PS_MQ_NS + channel, message);
            System.out.println("发送消息 " + RS_PS_MQ_NS + channel + " message=" + message);
        } catch (Exception e) {
            throw new RuntimeException("发布消息失败!");
        } finally {
            jedis.close();
        }
    }

    @Override
    public void subscribe(String... channels) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(this, channels);
        } catch (Exception e) {
            throw new RuntimeException("订阅频道失败");
        } finally {
            jedis.close();
        }
    }
}
  1. 通过stream实现
@Component
public class StreamVer {
    public final static String RS_STREAM_MQ_NS = "rsm:";

    @Autowired
    private JedisPool jedisPool;

    public final static int MQ_INFO_ConSUMER = 1;
    public final static int MQ_INFO_GROUP = 2;
    public final static int MQ_INFO_STREAM = 0;

    
    public StreamEntryID produce(String key, Map message) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            StreamEntryID streamId = jedis.xadd(RS_STREAM_MQ_NS + key, StreamEntryID.NEW_ENTRY, message);
            System.out.println("发送消息到 " + RS_STREAM_MQ_NS + "key, 消息id:" + streamId);
            return streamId;
        } catch (Exception e) {
            throw new RuntimeException("发布消息失败!");
        } finally {
            jedis.close();
        }
    }

    
    public void createConsumerGroup(String key, String groupName, String lastDeliveredId) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            StreamEntryID id = null;
            if (lastDeliveredId == null) {
                lastDeliveredId = "0-0";
            }
            id = new StreamEntryID(lastDeliveredId);
            // makeStream表示没有时是否自动创建stream,但是如果有,再自动创建会异常
            jedis.xgroupCreate(RS_STREAM_MQ_NS + key, groupName, id, false);
            System.out.println("创建消费者群组成功");
        } catch (Exception e) {
            throw new RuntimeException("创建消费者群组失败");
        } finally {
            jedis.close();
        }
    }

    
    public List>> consume(String key, String consumerName, String groupName) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            XReadGroupParams readGroupParams = new XReadGroupParams().block(0).count(1);
            Map params = new HashMap<>();
            params.put(RS_STREAM_MQ_NS + key, StreamEntryID.UNRECEIVED_ENTRY);
            List>> result = jedis.xreadGroup(groupName, consumerName, readGroupParams, params);
            System.out.println(groupName + "从" + RS_STREAM_MQ_NS + key + "接收消息,返回消息:" + result);
            return result;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("消息消费失败");
        } finally {
            jedis.close();
        }
    }

    
    public void ackMsg(String key, String groupName, StreamEntryID msgId) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            System.out.println(jedis.xack(RS_STREAM_MQ_NS + key, groupName, msgId));
            System.out.println(RS_STREAM_MQ_NS + key + ",群组 group:" + groupName + "消息已确认");
        } catch (Exception e) {
            throw new RuntimeException("确认失败");
        } finally {
            jedis.close();
        }
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4657129.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存