Redis提供了"发布、订阅"模式的消息机制,其中消息订阅者与发布者不直接通信,发布者向指定的频道(channel)发送消息,订阅者从频道拿取消息。
pubsub简单命令介绍-
publish [频道] [消息] 如 publish test yu
-
subscribe [频道] 订阅某个频道的消息
-
pubsub numsub [频道] 查看订阅数
-
unsubscribe [频道] 取消订阅
-
psubscribe ch* 按模式订阅
-
punsubscribe ch* 按模式取消订阅
注意:这种方式是一种发送既忘的原则,如果发送时没有消费者订阅,消息就会丢失,pub/sub作为消息中间件时不能保证消息的可靠。保证消息的可靠性时,不能作为选择。Redis后续推出了Stream的发布订阅功能。
可以使用消费者群组来消费,last_delivered_id来标识该消费者群组消费到了消息的哪个位置。一个消费者群组内部消费者可以有多个,且一个消息仅能被一个消费者群组下面的一个消费者消费。消息的确认ACK,确保消息一定被消费成功。pending_ids[]标识消息已经被消费者拿到了,但是消费者还没有做应答,如果一直不做应答,这个pending_ids(Pending Entries List)会越来越长,不支持消费超时,死信队列。
消息id: 毫秒时间戳-序号 表示消息是在这个时间戳产生的第几个。
Message Conent: 消息内容,String
-
xadd [stream] * [key] [value]…
*表示消息id由redis生成 stream 为stream名称,返回值为消息id,即使出现了时钟回拨,Last_generated_id 记录最大的id,会使-后面的数字+1,也不会导致重复。
-
xlen [stream] 查看stream中消息个数
-
xrange [stream] - + 查看stream中所有消息
-和+可以替换为对应的消息id
-
xdel [stream] [消息id] 删除stream中的指定消息
支持消费者群组,同时也支持单消费者,单消费者用来保证消息的顺序性。 -
xread count 1 streams [stream] 0-0 表示从头开始消费一条消息
-
xread count 1 streams [stream] 消息id 指定id开始
-
xread count 1 streams [stream] $ 从尾部读取消息
-
xread block 0 count 1 streams [stream] $ 阻塞式从尾部读取
消费者群组 -
xgroup create [stream] [group] 0-0
-
xgroup create [stream][group] $
-
xinfo groups [stream]
查看消费者群组的信息 -
xreadgroup GROUP [group] [consumer] count [n] streams [stream] >
创建一个消费者 group 消费者群组名称 , consumer 消费者名称 stream stream名称 > 从哪一个开始读取
xreadgroup GROUP cg1 c1 block 0 count 1 streams stream:test > 阻塞式读取
xinfo consumers [stream] [group]
查看消费者群组中的信息
-
xack [stream] [consumer] [消息id]
确认一条消息
-
xclaim 把消息在多个消费者之间转移
-
xpending [stream] [group] 查看未ack的消息
- 可通过队列实现 lpush brpop
@Component public class ListVer { public static final String RS_LIST_MQ_NS = "rs:list:mq"; @Autowired private JedisPool jedisPool; public void put(String key, String message) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.lpush(RS_LIST_MQ_NS+key, message); } catch (Exception ex) { throw new RuntimeException("发送消息失败"); } finally { jedis.close(); } } public Listget(String key){ Jedis jedis = null; try{ jedis = jedisPool.getResource(); return jedis.brpop(0,RS_LIST_MQ_NS+key); }catch (Exception ex){ throw new RuntimeException("获取消息失败"); }finally { jedis.close(); } } }
- 通过zset实现 一般是延时队列
@Component public class ZsetVer { public static final String RS_ZSET_MQ = "rs:zset:mq"; @Autowired private JedisPool jedisPool; public void sendMsg(String key, String message) { Jedis jedis = null; try { jedis = jedisPool.getResource(); for (int i = 0; i < 5; i++) { String orderNo = "000000000" + i; double expireTime = System.currentTimeMillis() + (i * 1000); jedis.zadd(RS_ZSET_MQ + key, expireTime, orderNo); } } catch (Exception ex) { throw new RuntimeException("发送延迟消息失败"); } finally { jedis.close(); } } public void getMsg(String key){ Jedis jedis = null; try{ jedis = jedisPool.getResource(); while (true){ Setstrings = jedis.zrangeByScore(RS_ZSET_MQ + key, 0, System.currentTimeMillis()); if(strings == null || strings.size() == 0){ Thread.sleep(1000); continue; } String s = strings.iterator().next(); if(jedis.zrem(RS_ZSET_MQ+key, s) > 0){ // 业务逻辑处理,这里删除的线程只有一个,所以只有一个可以处理到,不存在线程安全问题 System.out.println(s); } } }catch (Exception ex){ throw new RuntimeException("消费消息失败"); }finally { jedis.close(); } } }
- 通过pubsub实现 可能丢失消息
@Component public class PSVer extends JedisPubSub { public final static String RS_PS_MQ_NS = "rpsm:"; @Autowired private JedisPool jedisPool; @Override public void onMessage(String channel, String message) { System.out.println("Accept " + channel + ",message:" + message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("Subscribe " + channel + ", count:" + subscribedChannels); } public void publish(String channel, String message) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.publish(RS_PS_MQ_NS + channel, message); System.out.println("发送消息 " + RS_PS_MQ_NS + channel + " message=" + message); } catch (Exception e) { throw new RuntimeException("发布消息失败!"); } finally { jedis.close(); } } @Override public void subscribe(String... channels) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.subscribe(this, channels); } catch (Exception e) { throw new RuntimeException("订阅频道失败"); } finally { jedis.close(); } } }
- 通过stream实现
@Component public class StreamVer { public final static String RS_STREAM_MQ_NS = "rsm:"; @Autowired private JedisPool jedisPool; public final static int MQ_INFO_ConSUMER = 1; public final static int MQ_INFO_GROUP = 2; public final static int MQ_INFO_STREAM = 0; public StreamEntryID produce(String key, Mapmessage) { Jedis jedis = null; try { jedis = jedisPool.getResource(); StreamEntryID streamId = jedis.xadd(RS_STREAM_MQ_NS + key, StreamEntryID.NEW_ENTRY, message); System.out.println("发送消息到 " + RS_STREAM_MQ_NS + "key, 消息id:" + streamId); return streamId; } catch (Exception e) { throw new RuntimeException("发布消息失败!"); } finally { jedis.close(); } } public void createConsumerGroup(String key, String groupName, String lastDeliveredId) { Jedis jedis = null; try { jedis = jedisPool.getResource(); StreamEntryID id = null; if (lastDeliveredId == null) { lastDeliveredId = "0-0"; } id = new StreamEntryID(lastDeliveredId); // makeStream表示没有时是否自动创建stream,但是如果有,再自动创建会异常 jedis.xgroupCreate(RS_STREAM_MQ_NS + key, groupName, id, false); System.out.println("创建消费者群组成功"); } catch (Exception e) { throw new RuntimeException("创建消费者群组失败"); } finally { jedis.close(); } } public List >> consume(String key, String consumerName, String groupName) { Jedis jedis = null; try { jedis = jedisPool.getResource(); XReadGroupParams readGroupParams = new XReadGroupParams().block(0).count(1); Map params = new HashMap<>(); params.put(RS_STREAM_MQ_NS + key, StreamEntryID.UNRECEIVED_ENTRY); List >> result = jedis.xreadGroup(groupName, consumerName, readGroupParams, params); System.out.println(groupName + "从" + RS_STREAM_MQ_NS + key + "接收消息,返回消息:" + result); return result; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("消息消费失败"); } finally { jedis.close(); } } public void ackMsg(String key, String groupName, StreamEntryID msgId) { Jedis jedis = null; try { jedis = jedisPool.getResource(); System.out.println(jedis.xack(RS_STREAM_MQ_NS + key, groupName, msgId)); System.out.println(RS_STREAM_MQ_NS + key + ",群组 group:" + groupName + "消息已确认"); } catch (Exception e) { throw new RuntimeException("确认失败"); } finally { jedis.close(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)