无名exchange
channel.basicPublish("","hello",null,message.getBytes());第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey) 绑定 key 指定的,如果它存在的话 临时队列 每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称 的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除
String queueName = channel.queueDeclare().getQueue();绑定 (bindings) 什么是 bingding 呢, binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
Fanout 将接收到的所有消息 广播 到它知道的 所有队列中。系统中默认有些 exchange 类型 Direct exchange 日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误 (errros) ,而不存储哪些警告 (warning) 或信息 (info) 日志消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性 - 它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去。
X 绑定了两个队列,绑定类型是 direct 。队列 Q1 绑定键为 orange , 队列 Q2 绑定键有两个 : 一个绑定键为 black ,另一个绑定键为 green. 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1 。绑定键为 blackgreen 和的消息会被发布到队列 Q2 ,其他消息类型的消息将被丢弃。 多重绑定 当然如果 exchange 的绑定类型是 direct , 但是它绑定的多个队列的 key 如果都相同 ,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了 ,就跟广播差不多
Topic 的要求 发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它 必须是一个单 词列表,以点号分隔开 。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。 在这个规则列表中,其中有两个替换符是大家需要注意的 *(星号 )可以代替一个单词 #(井号 )可以替代零个或多个单词
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception {try (Channel channel = RabbitUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); MapbindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } } }
public class ReceiveLogsTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //声明 Q1 队列与绑定关系 String queueName="Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
public class ReceiveLogsTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //声明 Q2 队列与绑定关系 String queueName="Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }死信队列 某些时候由于特定的 原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列 死信的来源 消息 TTL 过期 队列达到最大长度 ( 队列满了,无法再添加数据到 mq 中 ) 消息被拒绝 (basic.reject 或 basic.nack) 并且 requeue=false 延迟队列 延时队列 , 队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景 1. 订单在十分钟之内未支付则自动取消 2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。 3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。 4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。 5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 RabbitMQ 中的 TTL TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。 消息设置TTL
rabbitremplate.convertAndSend(exchange:"X", routingKey :"XC", message, correlationData −>{ correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData ; }):队列设置TTL
//声明队列的TTI args,put("x-message-tt1") 5000); return QueueBuilder.durable(QUEUEA).withArguments(args),build();如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃 ( 如果配置了死信队列被丢到死信队 列中) ,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为 消息是否过期是在即将投递到消费者 之前判定的 ,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需 要注意的一点是,如果不设置 TTL ,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)